-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added support for Idempotent Producer #1152
Conversation
0e0f112
to
d6af3ca
Compare
@eapache Is this a feature you would be interested in? If I'm not mistaken, this would be the first 3rd party client to support it. It still needs more work and I plan to complete it this week. So I'd like to know if for any reasons it's not something you'd want |
i've also just started working on this, i was not aware that you already got the TX producer running. |
@buyology We've not solved that issue yet. Our plan was to try requeuing failed PartitionSets directly into the output channel of BrokerProducer. Then assuming MaxOpenRequests is 1 (like when Idempotent was first released in Java in 0.11), the producer should recover. We've not tested that yet though. Did you try something similar or some other methods? |
d6af3ca
to
1f91986
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a first pass over the PR and left a few comments. The overall approach taken seems very reasonable to me!
@@ -67,6 +67,9 @@ type Client interface { | |||
// in local cache. This function only works on Kafka 0.8.2 and higher. | |||
RefreshCoordinator(consumerGroup string) error | |||
|
|||
// InitProducerID retrieves information required for Idempotent Producer | |||
InitProducerID() (*InitProducerIDResponse, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to allow a Client
to be reused to create multiple transactional producers? I'm thinking of the fact that the transactional producer would also use the InitProducerID
-request but also specify the TransactionID
and TransactionTimeout
somewhere and I guess that would then have to be in the Config
— and then I guess the answer would be — no
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We think a future implementation of the transactional producer shall change the implementation of InitProducerID()
to get the TXID from the config.
Reusing a client with a TXID set in the config for creating a new producer would result in the old producer being fenced off as a zombie. Similar to the Java client if you were to reuse the config.
} | ||
if c.Net.MaxOpenRequests > 1 { | ||
return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Java client provides some fallbacks for these automatically, but I guess this is more in line with how sarama
does configuration in general
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we stuck to the sarama way of not rewriting the config under the covers
async_producer.go
Outdated
msg.retries++ | ||
} | ||
// extremely pessimistic strategy - refreshing metadata for every batch retried. Should be improved | ||
err := p.client.RefreshMetadata(topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess would basically be the InvalidMetadataException
s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, can you expand on the comment? we're unsure what you are pointing out.
About this snippet we're not ecstatic about the very pessimistic strategy of refreshing md on each , we're going to push a small commit to make one refresh only
// Other non-retriable errors | ||
default: | ||
bp.parent.returnErrors(msgs, block.Err) | ||
bp.parent.returnErrors(pSet.msgs, block.Err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Java client compensates subsequent sequence numbers due to any non-retriable errors:
I guess we would have to do something similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, currently upon hitting a non retriable error, we do not attempt to recover automatically from it. Users will have to create a new Producer to get a new ProducerId.
As you pointed out, the Java client tries to recover automatically (by fetching a new producerId and even resequencing message in some cases) but we feel like this can be done as a further improvement/PR.
50ec331
to
186fb18
Compare
@bai Can you take a look? |
Thanks @mimaison, looks solid. Could you please rebase your branch to get it tested against Go 1.11 too? |
Sure, I'll do that this week (hopefully today or tomorrow!) |
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com> Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Unit test with mock broker Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com> Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com> Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
186fb18
to
eb38bbc
Compare
@bai We rebased the PR. One of the Travis jobs failed while installing dependencies but all the other passed. I don't seem to be able to retrigger a build so I can rerun it. |
Thank you! |
Many thanks for contributing this 👍 |
This PR adds support for the Idempotent Producer.
If the Idempotent configuration is enabled, upon startup the producer will retrieve a ProducerID and add it as well as sequence numbers to all produce requests.
This is still not complete and has not been properly tested yet. Quick tests showed that when retrying with idempotency, messages that have already been written to disk by brokers are correctly marked as duplicates.
This should be enough to start a discussion. Have a look and see if it's something you'd like merged. If so, I'll complete testing.